package com.navinfo.data.esper.cep.marketdatafeed;

import com.espertech.esper.client.*;

import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class SampleClassThreading {

    public static void main(String[] args) throws InterruptedException {

        int numEvents = 1000000;
        int numThreads = 3;

        Configuration config = new Configuration();
        config.getEngineDefaults().getThreading()
                .setListenerDispatchPreserveOrder(false);
        config.getEngineDefaults().getThreading()
                .setInternalTimerEnabled(false);   // remove thread that handles time advancing
        EPServiceProvider engine = EPServiceProviderManager
                .getDefaultProvider(config);
        engine.getEPAdministrator().getConfiguration().addEventType(MyEvent.class);

        engine.getEPAdministrator().createEPL(
                "create context MyContext coalesce by consistent_hash_crc32(id) " +
                        "from MyEvent granularity 64 preallocate");
        String epl = "context MyContext select count(*) from MyEvent group by id";
        EPStatement stmt = engine.getEPAdministrator().createEPL(epl);
        stmt.setSubscriber(new MySubscriber());

        Thread[] threads = new Thread[numThreads];
        CountDownLatch latch = new CountDownLatch(numThreads);

        int eventsPerThreads = numEvents / numThreads;
        for (int i = 0; i < numThreads; i++) {
            threads[i] = new Thread(
                    new MyRunnable(latch, eventsPerThreads, engine.getEPRuntime()));
        }
        long startTime = System.currentTimeMillis();
        for (int i = 0; i < numThreads; i++) {
            threads[i].start();
        }

        latch.await(10, TimeUnit.MINUTES);
        if (latch.getCount() > 0) {
            throw new RuntimeException("Failed to complete in 10 minute");
        }
        long delta = System.currentTimeMillis() - startTime;
        System.out.println("Took " + delta + " millis");
    }

    public static class MySubscriber {
        public void update(Object[] args) {
        }
    }

    public static class MyRunnable implements Runnable {
        private final CountDownLatch latch;
        private final int numEvents;
        private final EPRuntime runtime;

        public MyRunnable(CountDownLatch latch, int numEvents, EPRuntime runtime) {
            this.latch = latch;
            this.numEvents = numEvents;
            this.runtime = runtime;
        }

        public void run() {
            Random r = new Random();
            for (int i = 0; i < numEvents; i++) {
                runtime.sendEvent(new MyEvent(r.nextInt(512)));
            }
            latch.countDown();
        }
    }

    public static class MyEvent {
        private final int id;

        public MyEvent(int id) {
            this.id = id;
        }

        public int getId() {
            return id;
        }
    }
}
